1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.solr.handler;
18  
19  import java.io.File;
20  import java.io.FileInputStream;
21  import java.io.FileNotFoundException;
22  import java.io.IOException;
23  import java.io.InputStream;
24  import java.io.InputStreamReader;
25  import java.io.OutputStream;
26  import java.lang.invoke.MethodHandles;
27  import java.nio.ByteBuffer;
28  import java.nio.channels.FileChannel;
29  import java.nio.charset.StandardCharsets;
30  import java.nio.file.NoSuchFileException;
31  import java.util.ArrayList;
32  import java.util.Arrays;
33  import java.util.Collection;
34  import java.util.Collections;
35  import java.util.Date;
36  import java.util.HashMap;
37  import java.util.List;
38  import java.util.Map;
39  import java.util.Properties;
40  import java.util.Random;
41  import java.util.concurrent.ExecutorService;
42  import java.util.concurrent.Executors;
43  import java.util.concurrent.Future;
44  import java.util.concurrent.ScheduledExecutorService;
45  import java.util.concurrent.ScheduledFuture;
46  import java.util.concurrent.TimeUnit;
47  import java.util.concurrent.atomic.AtomicBoolean;
48  import java.util.concurrent.locks.ReentrantLock;
49  import java.util.regex.Matcher;
50  import java.util.regex.Pattern;
51  import java.util.zip.Adler32;
52  import java.util.zip.Checksum;
53  import java.util.zip.DeflaterOutputStream;
54  
55  import org.apache.commons.io.IOUtils;
56  import org.apache.lucene.codecs.CodecUtil;
57  import org.apache.lucene.index.DirectoryReader;
58  import org.apache.lucene.index.IndexCommit;
59  import org.apache.lucene.index.IndexDeletionPolicy;
60  import org.apache.lucene.index.IndexWriter;
61  import org.apache.lucene.index.SegmentCommitInfo;
62  import org.apache.lucene.index.SegmentInfos;
63  import org.apache.lucene.store.Directory;
64  import org.apache.lucene.store.IOContext;
65  import org.apache.lucene.store.IndexInput;
66  import org.apache.lucene.store.RateLimiter;
67  import org.apache.lucene.util.Version;
68  import org.apache.solr.common.SolrException;
69  import org.apache.solr.common.SolrException.ErrorCode;
70  import org.apache.solr.common.params.CommonParams;
71  import org.apache.solr.common.params.ModifiableSolrParams;
72  import org.apache.solr.common.params.SolrParams;
73  import org.apache.solr.common.util.ExecutorUtil;
74  import org.apache.solr.common.util.FastOutputStream;
75  import org.apache.solr.common.util.NamedList;
76  import org.apache.solr.common.util.SimpleOrderedMap;
77  import org.apache.solr.common.util.StrUtils;
78  import org.apache.solr.common.util.SuppressForbidden;
79  import org.apache.solr.core.CloseHook;
80  import org.apache.solr.core.DirectoryFactory;
81  import org.apache.solr.core.DirectoryFactory.DirContext;
82  import org.apache.solr.core.IndexDeletionPolicyWrapper;
83  import org.apache.solr.core.SolrCore;
84  import org.apache.solr.core.SolrDeletionPolicy;
85  import org.apache.solr.core.SolrEventListener;
86  import org.apache.solr.request.SolrQueryRequest;
87  import org.apache.solr.response.SolrQueryResponse;
88  import org.apache.solr.search.SolrIndexSearcher;
89  import org.apache.solr.update.SolrIndexWriter;
90  import org.apache.solr.util.DefaultSolrThreadFactory;
91  import org.apache.solr.util.NumberUtils;
92  import org.apache.solr.util.PropertiesInputStream;
93  import org.apache.solr.util.RefCounted;
94  import org.apache.solr.util.plugin.SolrCoreAware;
95  import org.slf4j.Logger;
96  import org.slf4j.LoggerFactory;
97  import org.slf4j.MDC;
98  
99  import static org.apache.solr.common.params.CommonParams.NAME;
100 
101 /**
102  * <p> A Handler which provides a REST API for replication and serves replication requests from Slaves. </p>
103  * <p>When running on the master, it provides the following commands <ol> <li>Get the current replicable index version
104  * (command=indexversion)</li> <li>Get the list of files for a given index version
105  * (command=filelist&amp;indexversion=&lt;VERSION&gt;)</li> <li>Get full or a part (chunk) of a given index or a config
106  * file (command=filecontent&amp;file=&lt;FILE_NAME&gt;) You can optionally specify an offset and length to get that
107  * chunk of the file. You can request a configuration file by using "cf" parameter instead of the "file" parameter.</li>
108  * <li>Get status/statistics (command=details)</li> </ol> <p>When running on the slave, it provides the following
109  * commands <ol> <li>Perform an index fetch now (command=snappull)</li> <li>Get status/statistics (command=details)</li>
110  * <li>Abort an index fetch (command=abort)</li> <li>Enable/Disable polling the master for new versions (command=enablepoll
111  * or command=disablepoll)</li> </ol>
112  *
113  *
114  * @since solr 1.4
115  */
116 public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAware {
117 
118   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
119   SolrCore core;
120   
121   private volatile boolean closed = false;
122 
123   private static final class CommitVersionInfo {
124     public final long version;
125     public final long generation;
126     private CommitVersionInfo(long g, long v) {
127       generation = g;
128       version = v;
129     }
130     /**
131      * builds a CommitVersionInfo data for the specified IndexCommit.  
132      * Will never be null, ut version and generation may be zero if 
133      * there are problems extracting them from the commit data
134      */
135     public static CommitVersionInfo build(IndexCommit commit) {
136       long generation = commit.getGeneration();
137       long version = 0;
138       try {
139         final Map<String,String> commitData = commit.getUserData();
140         String commitTime = commitData.get(SolrIndexWriter.COMMIT_TIME_MSEC_KEY);
141         if (commitTime != null) {
142           try {
143             version = Long.parseLong(commitTime);
144           } catch (NumberFormatException e) {
145             LOG.warn("Version in commitData was not formated correctly: " + commitTime, e);
146           }
147         }
148       } catch (IOException e) {
149         LOG.warn("Unable to get version from commitData, commit: " + commit, e);
150       }
151       return new CommitVersionInfo(generation, version);
152     }
153   }
154 
155   private IndexFetcher pollingIndexFetcher;
156 
157   private ReentrantLock indexFetchLock = new ReentrantLock();
158 
159   private ExecutorService restoreExecutor = ExecutorUtil.newMDCAwareSingleThreadExecutor(
160       new DefaultSolrThreadFactory("restoreExecutor"));
161 
162   private volatile Future<Boolean> restoreFuture;
163 
164   private volatile String currentRestoreName;
165 
166   private String includeConfFiles;
167 
168   private NamedList<String> confFileNameAlias = new NamedList<>();
169 
170   private boolean isMaster = false;
171 
172   private boolean isSlave = false;
173 
174   private boolean replicateOnOptimize = false;
175 
176   private boolean replicateOnCommit = false;
177 
178   private boolean replicateOnStart = false;
179 
180   private ScheduledExecutorService executorService;
181 
182   private volatile long executorStartTime;
183 
184   private int numberBackupsToKeep = 0; //zero: do not delete old backups
185 
186   private int numTimesReplicated = 0;
187 
188   private final Map<String, FileInfo> confFileInfoCache = new HashMap<>();
189 
190   private Integer reserveCommitDuration = readIntervalMs("00:00:10");
191 
192   volatile IndexCommit indexCommitPoint;
193 
194   volatile NamedList<Object> snapShootDetails;
195 
196   private AtomicBoolean replicationEnabled = new AtomicBoolean(true);
197 
198   private Long pollIntervalNs;
199   private String pollIntervalStr;
200 
201   /**
202    * Disable the timer task for polling
203    */
204   private AtomicBoolean pollDisabled = new AtomicBoolean(false);
205 
206   String getPollInterval() {
207     return pollIntervalStr;
208   }
209 
210   @Override
211   public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
212     rsp.setHttpCaching(false);
213     final SolrParams solrParams = req.getParams();
214     String command = solrParams.get(COMMAND);
215     if (command == null) {
216       rsp.add(STATUS, OK_STATUS);
217       rsp.add("message", "No command");
218       return;
219     }
220     // This command does not give the current index version of the master
221     // It gives the current 'replicateable' index version
222     if (command.equals(CMD_INDEX_VERSION)) {
223       IndexCommit commitPoint = indexCommitPoint;  // make a copy so it won't change
224 
225       if (commitPoint == null) {
226         // if this handler is 'lazy', we may not have tracked the last commit
227         // because our commit listener is registered on inform
228         commitPoint = core.getDeletionPolicy().getLatestCommit();
229       }
230 
231       if (commitPoint != null && replicationEnabled.get()) {
232         //
233         // There is a race condition here.  The commit point may be changed / deleted by the time
234         // we get around to reserving it.  This is a very small window though, and should not result
235         // in a catastrophic failure, but will result in the client getting an empty file list for
236         // the CMD_GET_FILE_LIST command.
237         //
238         core.getDeletionPolicy().setReserveDuration(commitPoint.getGeneration(), reserveCommitDuration);
239         rsp.add(CMD_INDEX_VERSION, IndexDeletionPolicyWrapper.getCommitTimestamp(commitPoint));
240         rsp.add(GENERATION, commitPoint.getGeneration());
241       } else {
242         // This happens when replication is not configured to happen after startup and no commit/optimize
243         // has happened yet.
244         rsp.add(CMD_INDEX_VERSION, 0L);
245         rsp.add(GENERATION, 0L);
246       }
247     } else if (command.equals(CMD_GET_FILE)) {
248       getFileStream(solrParams, rsp);
249     } else if (command.equals(CMD_GET_FILE_LIST)) {
250       getFileList(solrParams, rsp);
251     } else if (command.equalsIgnoreCase(CMD_BACKUP)) {
252       doSnapShoot(new ModifiableSolrParams(solrParams), rsp, req);
253       rsp.add(STATUS, OK_STATUS);
254     } else if (command.equalsIgnoreCase(CMD_RESTORE)) {
255       restore(new ModifiableSolrParams(solrParams), rsp, req);
256       rsp.add(STATUS, OK_STATUS);
257     } else if (command.equalsIgnoreCase(CMD_RESTORE_STATUS)) {
258       rsp.add(CMD_RESTORE_STATUS, getRestoreStatus());
259     } else if (command.equalsIgnoreCase(CMD_DELETE_BACKUP)) {
260       deleteSnapshot(new ModifiableSolrParams(solrParams));
261       rsp.add(STATUS, OK_STATUS);
262     } else if (command.equalsIgnoreCase(CMD_FETCH_INDEX)) {
263       String masterUrl = solrParams.get(MASTER_URL);
264       if (!isSlave && masterUrl == null) {
265         rsp.add(STATUS,ERR_STATUS);
266         rsp.add("message","No slave configured or no 'masterUrl' Specified");
267         return;
268       }
269       final SolrParams paramsCopy = new ModifiableSolrParams(solrParams);
270       Thread fetchThread = new Thread("explicit-fetchindex-cmd") {
271         @Override
272         public void run() {
273           doFetch(paramsCopy, false);
274         }
275       };
276       fetchThread.setDaemon(false);
277       fetchThread.start();
278       if (solrParams.getBool(WAIT, false)) {
279         fetchThread.join();
280       }
281       rsp.add(STATUS, OK_STATUS);
282     } else if (command.equalsIgnoreCase(CMD_DISABLE_POLL)) {
283       if (pollingIndexFetcher != null){
284         disablePoll();
285         rsp.add(STATUS, OK_STATUS);
286       } else {
287         rsp.add(STATUS, ERR_STATUS);
288         rsp.add("message","No slave configured");
289       }
290     } else if (command.equalsIgnoreCase(CMD_ENABLE_POLL)) {
291       if (pollingIndexFetcher != null){
292         enablePoll();
293         rsp.add(STATUS, OK_STATUS);
294       }else {
295         rsp.add(STATUS,ERR_STATUS);
296         rsp.add("message","No slave configured");
297       }
298     } else if (command.equalsIgnoreCase(CMD_ABORT_FETCH)) {
299       IndexFetcher fetcher = currentIndexFetcher;
300       if (fetcher != null){
301         fetcher.abortFetch();
302         rsp.add(STATUS, OK_STATUS);
303       } else {
304         rsp.add(STATUS,ERR_STATUS);
305         rsp.add("message","No slave configured");
306       }
307     } else if (command.equals(CMD_SHOW_COMMITS)) {
308       rsp.add(CMD_SHOW_COMMITS, getCommits());
309     } else if (command.equals(CMD_DETAILS)) {
310       rsp.add(CMD_DETAILS, getReplicationDetails(solrParams.getBool("slave", true)));
311     } else if (CMD_ENABLE_REPL.equalsIgnoreCase(command)) {
312       replicationEnabled.set(true);
313       rsp.add(STATUS, OK_STATUS);
314     } else if (CMD_DISABLE_REPL.equalsIgnoreCase(command)) {
315       replicationEnabled.set(false);
316       rsp.add(STATUS, OK_STATUS);
317     }
318   }
319 
320   private void deleteSnapshot(ModifiableSolrParams params) {
321     String name = params.get(NAME);
322     if(name == null) {
323       throw new SolrException(ErrorCode.BAD_REQUEST, "Missing mandatory param: name");
324     }
325 
326     SnapShooter snapShooter = new SnapShooter(core, params.get(LOCATION), params.get(NAME));
327     snapShooter.validateDeleteSnapshot();
328     snapShooter.deleteSnapAsync(this);
329   }
330 
331   private List<NamedList<Object>> getCommits() {
332     Map<Long, IndexCommit> commits = core.getDeletionPolicy().getCommits();
333     List<NamedList<Object>> l = new ArrayList<>();
334 
335     for (IndexCommit c : commits.values()) {
336       try {
337         NamedList<Object> nl = new NamedList<>();
338         nl.add("indexVersion", IndexDeletionPolicyWrapper.getCommitTimestamp(c));
339         nl.add(GENERATION, c.getGeneration());
340         List<String> commitList = new ArrayList<>(c.getFileNames().size());
341         commitList.addAll(c.getFileNames());
342         Collections.sort(commitList);
343         nl.add(CMD_GET_FILE_LIST, commitList);
344         l.add(nl);
345       } catch (IOException e) {
346         LOG.warn("Exception while reading files for commit " + c, e);
347       }
348     }
349     return l;
350   }
351 
352   static Long getCheckSum(Checksum checksum, File f) {
353     FileInputStream fis = null;
354     checksum.reset();
355     byte[] buffer = new byte[1024 * 1024];
356     int bytesRead;
357     try {
358       fis = new FileInputStream(f);
359       while ((bytesRead = fis.read(buffer)) >= 0)
360         checksum.update(buffer, 0, bytesRead);
361       return checksum.getValue();
362     } catch (Exception e) {
363       LOG.warn("Exception in finding checksum of " + f, e);
364     } finally {
365       IOUtils.closeQuietly(fis);
366     }
367     return null;
368   }
369 
370   private volatile IndexFetcher currentIndexFetcher;
371 
372   public boolean doFetch(SolrParams solrParams, boolean forceReplication) {
373     String masterUrl = solrParams == null ? null : solrParams.get(MASTER_URL);
374     if (!indexFetchLock.tryLock())
375       return false;
376     try {
377       if (masterUrl != null) {
378         if (currentIndexFetcher != null && currentIndexFetcher != pollingIndexFetcher) {
379           currentIndexFetcher.destroy();
380         }
381         currentIndexFetcher = new IndexFetcher(solrParams.toNamedList(), this, core);
382       } else {
383         currentIndexFetcher = pollingIndexFetcher;
384       }
385       return currentIndexFetcher.fetchLatestIndex(forceReplication);
386     } catch (Exception e) {
387       SolrException.log(LOG, "Index fetch failed ", e);
388     } finally {
389       if (pollingIndexFetcher != null) {
390         currentIndexFetcher = pollingIndexFetcher;
391       }
392       indexFetchLock.unlock();
393     }
394     return false;
395   }
396 
397   boolean isReplicating() {
398     return indexFetchLock.isLocked();
399   }
400 
401   private void restore(SolrParams params, SolrQueryResponse rsp, SolrQueryRequest req) {
402     if (restoreFuture != null && !restoreFuture.isDone()) {
403       throw new SolrException(ErrorCode.BAD_REQUEST, "Restore in progress. Cannot run multiple restore operations" +
404           "for the same core");
405     }
406     String name = params.get(NAME);
407     String location = params.get(LOCATION);
408 
409     //If location is not provided then assume that the restore index is present inside the data directory.
410     if (location == null) {
411       location = core.getDataDir();
412     }
413 
414     //If name is not provided then look for the last unnamed( the ones with the snapshot.timestamp format)
415     //snapshot folder since we allow snapshots to be taken without providing a name. Pick the latest timestamp.
416     if (name == null) {
417       File[] files = new File(location).listFiles();
418       List<OldBackupDirectory> dirs = new ArrayList<>();
419       for (File f : files) {
420         OldBackupDirectory obd = new OldBackupDirectory(f);
421         if (obd.dir != null) {
422           dirs.add(obd);
423         }
424       }
425       Collections.sort(dirs);
426       if (dirs.size() == 0) {
427         throw new SolrException(ErrorCode.BAD_REQUEST, "No backup name specified and none found in " + core.getDataDir());
428       }
429       name = dirs.get(0).dir.getName();
430     } else {
431       //"snapshot." is prefixed by snapshooter
432       name = "snapshot." + name;
433     }
434 
435     RestoreCore restoreCore = new RestoreCore(core, location, name);
436     try {
437       MDC.put("RestoreCore.core", core.getName());
438       MDC.put("RestoreCore.backupLocation", location);
439       MDC.put("RestoreCore.backupName", name);
440       restoreFuture = restoreExecutor.submit(restoreCore);
441       currentRestoreName = name;
442     } finally {
443       MDC.remove("RestoreCore.core");
444       MDC.remove("RestoreCore.backupLocation");
445       MDC.remove("RestoreCore.backupName");
446     }
447   }
448 
449   private NamedList<Object> getRestoreStatus() {
450     NamedList<Object> status = new SimpleOrderedMap<>();
451 
452     if (restoreFuture == null) {
453       status.add(STATUS, "No restore actions in progress");
454       return status;
455     }
456 
457     status.add("snapshotName", currentRestoreName);
458     if (restoreFuture.isDone()) {
459       try {
460         boolean success = restoreFuture.get();
461         if (success) {
462           status.add(STATUS, SUCCESS);
463         } else {
464           status.add(STATUS, FAILED);
465         }
466       } catch (Exception e) {
467         status.add(STATUS, FAILED);
468         status.add(EXCEPTION, e.getMessage());
469       }
470     } else {
471       status.add(STATUS, "In Progress");
472     }
473     return status;
474   }
475 
476   private void doSnapShoot(SolrParams params, SolrQueryResponse rsp,
477       SolrQueryRequest req) {
478     try {
479       int numberToKeep = params.getInt(NUMBER_BACKUPS_TO_KEEP_REQUEST_PARAM, 0);
480       if (numberToKeep > 0 && numberBackupsToKeep > 0) {
481         throw new SolrException(ErrorCode.BAD_REQUEST, "Cannot use "
482             + NUMBER_BACKUPS_TO_KEEP_REQUEST_PARAM + " if "
483             + NUMBER_BACKUPS_TO_KEEP_INIT_PARAM
484             + " was specified in the configuration.");
485       }
486       numberToKeep = Math.max(numberToKeep, numberBackupsToKeep);
487       if (numberToKeep < 1) {
488         numberToKeep = Integer.MAX_VALUE;
489       }
490 
491       IndexDeletionPolicyWrapper delPolicy = core.getDeletionPolicy();
492       IndexCommit indexCommit = delPolicy.getLatestCommit();
493 
494       if (indexCommit == null) {
495         indexCommit = req.getSearcher().getIndexReader().getIndexCommit();
496       }
497 
498       // small race here before the commit point is saved
499       SnapShooter snapShooter = new SnapShooter(core, params.get("location"), params.get(NAME));
500       snapShooter.validateCreateSnapshot();
501       snapShooter.createSnapAsync(indexCommit, numberToKeep, this);
502 
503     } catch (Exception e) {
504       LOG.warn("Exception during creating a snapshot", e);
505       rsp.add("exception", e);
506     }
507   }
508 
509   /**
510    * This method adds an Object of FileStream to the response . The FileStream implements a custom protocol which is
511    * understood by IndexFetcher.FileFetcher
512    *
513    * @see IndexFetcher.LocalFsFileFetcher
514    * @see IndexFetcher.DirectoryFileFetcher
515    */
516   private void getFileStream(SolrParams solrParams, SolrQueryResponse rsp) {
517     ModifiableSolrParams rawParams = new ModifiableSolrParams(solrParams);
518     rawParams.set(CommonParams.WT, FILE_STREAM);
519 
520     String cfileName = solrParams.get(CONF_FILE_SHORT);
521     if (cfileName != null) {
522       rsp.add(FILE_STREAM, new LocalFsFileStream(solrParams));
523     } else {
524       rsp.add(FILE_STREAM, new DirectoryFileStream(solrParams));
525     }
526   }
527 
528   @SuppressWarnings("unchecked")
529   private void getFileList(SolrParams solrParams, SolrQueryResponse rsp) {
530     String v = solrParams.get(GENERATION);
531     if (v == null) {
532       rsp.add("status", "no index generation specified");
533       return;
534     }
535     long gen = Long.parseLong(v);
536     IndexCommit commit = core.getDeletionPolicy().getCommitPoint(gen);
537 
538     //System.out.println("ask for files for gen:" + commit.getGeneration() + core.getCoreDescriptor().getCoreContainer().getZkController().getNodeName());
539     if (commit == null) {
540       rsp.add("status", "invalid index generation");
541       return;
542     }
543     // reserve the indexcommit for sometime
544     core.getDeletionPolicy().setReserveDuration(gen, reserveCommitDuration);
545     List<Map<String, Object>> result = new ArrayList<>();
546     Directory dir = null;
547     Version version = null;
548     try {
549       dir = core.getDirectoryFactory().get(core.getNewIndexDir(), DirContext.DEFAULT, core.getSolrConfig().indexConfig.lockType);
550       SegmentInfos infos = SegmentInfos.readCommit(dir, commit.getSegmentsFileName());
551       Version oldestVersion = Version.LUCENE_CURRENT;
552       for (SegmentCommitInfo commitInfo : infos) {
553         version = commitInfo.info.getVersion();
554         if (oldestVersion.onOrAfter(version)) {
555           oldestVersion = version;
556         }
557         for (String file : commitInfo.files()) {
558           Map<String,Object> fileMeta = new HashMap<>();
559           fileMeta.put(NAME, file);
560           fileMeta.put(SIZE, dir.fileLength(file));
561 
562           try (final IndexInput in = dir.openInput(file, IOContext.READONCE)) {
563             if (version.onOrAfter(Version.LUCENE_4_8_0)) {
564               try {
565                 long checksum = CodecUtil.retrieveChecksum(in);
566                 fileMeta.put(CHECKSUM, checksum);
567               } catch(Exception e) {
568                 LOG.warn("Could not read checksum from index file: " + file, e);
569               }
570             }
571           }
572 
573           result.add(fileMeta);
574         }
575       }
576       
577       // add the segments_N file
578 
579       // we use the oldest version seen to determine
580       // how we treat the segments_N file - conservative, easy
581       
582       Map<String,Object> fileMeta = new HashMap<>();
583       fileMeta.put(NAME, infos.getSegmentsFileName());
584       fileMeta.put(SIZE, dir.fileLength(infos.getSegmentsFileName()));
585       if (infos.getId() != null) {
586         try (final IndexInput in = dir.openInput(infos.getSegmentsFileName(), IOContext.READONCE)) {
587           if (oldestVersion.onOrAfter(Version.LUCENE_4_8_0)) {
588             try {
589               fileMeta.put(CHECKSUM, CodecUtil.retrieveChecksum(in));
590             } catch(Exception e) {
591               LOG.warn("Could not read checksum from index file: " + infos.getSegmentsFileName(), e);
592             }
593           }
594         }
595       }
596       result.add(fileMeta);
597     } catch (IOException e) {
598       rsp.add("status", "unable to get file names for given index generation");
599       rsp.add(EXCEPTION, e);
600       LOG.error("Unable to get file names for indexCommit generation: " + gen, e);
601     } finally {
602       if (dir != null) {
603         try {
604           core.getDirectoryFactory().release(dir);
605         } catch (IOException e) {
606           SolrException.log(LOG, "Could not release directory after fetching file list", e);
607         }
608       }
609     }
610     rsp.add(CMD_GET_FILE_LIST, result);
611     if (confFileNameAlias.size() < 1 || core.getCoreDescriptor().getCoreContainer().isZooKeeperAware())
612       return;
613     LOG.debug("Adding config files to list: " + includeConfFiles);
614     //if configuration files need to be included get their details
615     rsp.add(CONF_FILES, getConfFileInfoFromCache(confFileNameAlias, confFileInfoCache));
616   }
617 
618   /**
619    * For configuration files, checksum of the file is included because, unlike index files, they may have same content
620    * but different timestamps.
621    * <p/>
622    * The local conf files information is cached so that everytime it does not have to compute the checksum. The cache is
623    * refreshed only if the lastModified of the file changes
624    */
625   List<Map<String, Object>> getConfFileInfoFromCache(NamedList<String> nameAndAlias,
626                                                      final Map<String, FileInfo> confFileInfoCache) {
627     List<Map<String, Object>> confFiles = new ArrayList<>();
628     synchronized (confFileInfoCache) {
629       File confDir = new File(core.getResourceLoader().getConfigDir());
630       Checksum checksum = null;
631       for (int i = 0; i < nameAndAlias.size(); i++) {
632         String cf = nameAndAlias.getName(i);
633         File f = new File(confDir, cf);
634         if (!f.exists() || f.isDirectory()) continue; //must not happen
635         FileInfo info = confFileInfoCache.get(cf);
636         if (info == null || info.lastmodified != f.lastModified() || info.size != f.length()) {
637           if (checksum == null) checksum = new Adler32();
638           info = new FileInfo(f.lastModified(), cf, f.length(), getCheckSum(checksum, f));
639           confFileInfoCache.put(cf, info);
640         }
641         Map<String, Object> m = info.getAsMap();
642         if (nameAndAlias.getVal(i) != null) m.put(ALIAS, nameAndAlias.getVal(i));
643         confFiles.add(m);
644       }
645     }
646     return confFiles;
647   }
648 
649   static class FileInfo {
650     long lastmodified;
651     String name;
652     long size;
653     long checksum;
654 
655     public FileInfo(long lasmodified, String name, long size, long checksum) {
656       this.lastmodified = lasmodified;
657       this.name = name;
658       this.size = size;
659       this.checksum = checksum;
660     }
661 
662     Map<String, Object> getAsMap() {
663       Map<String, Object> map = new HashMap<>();
664       map.put(NAME, name);
665       map.put(SIZE, size);
666       map.put(CHECKSUM, checksum);
667       return map;
668     }
669   }
670 
671   void disablePoll() {
672     if (isSlave) {
673       pollDisabled.set(true);
674       LOG.info("inside disable poll, value of pollDisabled = " + pollDisabled);
675     }
676   }
677 
678   void enablePoll() {
679     if (isSlave) {
680       pollDisabled.set(false);
681       LOG.info("inside enable poll, value of pollDisabled = " + pollDisabled);
682     }
683   }
684 
685   boolean isPollingDisabled() {
686     return pollDisabled.get();
687   }
688 
689   @SuppressForbidden(reason = "Need currentTimeMillis, to output next execution time in replication details")
690   private void markScheduledExecutionStart() {
691     executorStartTime = System.currentTimeMillis();
692   }
693 
694   private Date getNextScheduledExecTime() {
695     Date nextTime = null;
696     if (executorStartTime > 0)
697       nextTime = new Date(executorStartTime + TimeUnit.MILLISECONDS.convert(pollIntervalNs, TimeUnit.NANOSECONDS));
698     return nextTime;
699   }
700 
701   int getTimesReplicatedSinceStartup() {
702     return numTimesReplicated;
703   }
704 
705   void setTimesReplicatedSinceStartup() {
706     numTimesReplicated++;
707   }
708 
709   long getIndexSize() {
710     Directory dir;
711     long size = 0;
712     try {
713       dir = core.getDirectoryFactory().get(core.getIndexDir(), DirContext.DEFAULT, core.getSolrConfig().indexConfig.lockType);
714       try {
715         size = DirectoryFactory.sizeOfDirectory(dir);
716       } finally {
717         core.getDirectoryFactory().release(dir);
718       }
719     } catch (IOException e) {
720       SolrException.log(LOG, "IO error while trying to get the size of the Directory", e);
721     }
722     return size;
723   }
724 
725   @Override
726   public String getDescription() {
727     return "ReplicationHandler provides replication of index and configuration files from Master to Slaves";
728   }
729 
730   /**
731    * returns the CommitVersionInfo for the current searcher, or null on error.
732    */
733   private CommitVersionInfo getIndexVersion() {
734     CommitVersionInfo v = null;
735     RefCounted<SolrIndexSearcher> searcher = core.getSearcher();
736     try {
737       v = CommitVersionInfo.build(searcher.get().getIndexReader().getIndexCommit());
738     } catch (IOException e) {
739       LOG.warn("Unable to get index commit: ", e);
740     } finally {
741       searcher.decref();
742     }
743     return v;
744   }
745 
746   @Override
747   @SuppressWarnings("unchecked")
748   public NamedList getStatistics() {
749     NamedList list = super.getStatistics();
750     if (core != null) {
751       list.add("indexSize", NumberUtils.readableSize(getIndexSize()));
752       CommitVersionInfo vInfo = (core != null && !core.isClosed()) ? getIndexVersion(): null;
753       list.add("indexVersion", null == vInfo ? 0 : vInfo.version);
754       list.add(GENERATION, null == vInfo ? 0 : vInfo.generation);
755 
756       list.add("indexPath", core.getIndexDir());
757       list.add("isMaster", String.valueOf(isMaster));
758       list.add("isSlave", String.valueOf(isSlave));
759 
760       IndexFetcher fetcher = currentIndexFetcher;
761       if (fetcher != null) {
762         list.add(MASTER_URL, fetcher.getMasterUrl());
763         if (getPollInterval() != null) {
764           list.add(POLL_INTERVAL, getPollInterval());
765         }
766         list.add("isPollingDisabled", String.valueOf(isPollingDisabled()));
767         list.add("isReplicating", String.valueOf(isReplicating()));
768         long elapsed = fetcher.getReplicationTimeElapsed();
769         long val = fetcher.getTotalBytesDownloaded();
770         if (elapsed > 0) {
771           list.add("timeElapsed", elapsed);
772           list.add("bytesDownloaded", val);
773           list.add("downloadSpeed", val / elapsed);
774         }
775         Properties props = loadReplicationProperties();
776         addVal(list, IndexFetcher.PREVIOUS_CYCLE_TIME_TAKEN, props, Long.class);
777         addVal(list, IndexFetcher.INDEX_REPLICATED_AT, props, Date.class);
778         addVal(list, IndexFetcher.CONF_FILES_REPLICATED_AT, props, Date.class);
779         addVal(list, IndexFetcher.REPLICATION_FAILED_AT, props, Date.class);
780         addVal(list, IndexFetcher.TIMES_FAILED, props, Integer.class);
781         addVal(list, IndexFetcher.TIMES_INDEX_REPLICATED, props, Integer.class);
782         addVal(list, IndexFetcher.LAST_CYCLE_BYTES_DOWNLOADED, props, Long.class);
783         addVal(list, IndexFetcher.TIMES_CONFIG_REPLICATED, props, Integer.class);
784         addVal(list, IndexFetcher.CONF_FILES_REPLICATED, props, String.class);
785       }
786       if (isMaster) {
787         if (includeConfFiles != null) list.add("confFilesToReplicate", includeConfFiles);
788         list.add(REPLICATE_AFTER, getReplicateAfterStrings());
789         list.add("replicationEnabled", String.valueOf(replicationEnabled.get()));
790       }
791     }
792     return list;
793   }
794 
795   /**
796    * Used for showing statistics and progress information.
797    */
798   private NamedList<Object> getReplicationDetails(boolean showSlaveDetails) {
799     NamedList<Object> details = new SimpleOrderedMap<>();
800     NamedList<Object> master = new SimpleOrderedMap<>();
801     NamedList<Object> slave = new SimpleOrderedMap<>();
802 
803     details.add("indexSize", NumberUtils.readableSize(getIndexSize()));
804     details.add("indexPath", core.getIndexDir());
805     details.add(CMD_SHOW_COMMITS, getCommits());
806     details.add("isMaster", String.valueOf(isMaster));
807     details.add("isSlave", String.valueOf(isSlave));
808     CommitVersionInfo vInfo = getIndexVersion();
809     details.add("indexVersion", null == vInfo ? 0 : vInfo.version);
810     details.add(GENERATION, null == vInfo ? 0 : vInfo.generation);
811 
812     IndexCommit commit = indexCommitPoint;  // make a copy so it won't change
813 
814     if (isMaster) {
815       if (includeConfFiles != null) master.add(CONF_FILES, includeConfFiles);
816       master.add(REPLICATE_AFTER, getReplicateAfterStrings());
817       master.add("replicationEnabled", String.valueOf(replicationEnabled.get()));
818     }
819 
820     if (isMaster && commit != null) {
821       CommitVersionInfo repCommitInfo = CommitVersionInfo.build(commit);
822       master.add("replicableVersion", repCommitInfo.version);
823       master.add("replicableGeneration", repCommitInfo.generation);
824     }
825 
826     IndexFetcher fetcher = currentIndexFetcher;
827     if (fetcher != null) {
828       Properties props = loadReplicationProperties();
829       if (showSlaveDetails) {
830         try {
831           NamedList nl = fetcher.getDetails();
832           slave.add("masterDetails", nl.get(CMD_DETAILS));
833         } catch (Exception e) {
834           LOG.warn(
835               "Exception while invoking 'details' method for replication on master ",
836               e);
837           slave.add(ERR_STATUS, "invalid_master");
838         }
839       }
840       slave.add(MASTER_URL, fetcher.getMasterUrl());
841       if (getPollInterval() != null) {
842         slave.add(POLL_INTERVAL, getPollInterval());
843       }
844       Date nextScheduled = getNextScheduledExecTime();
845       if (nextScheduled != null && !isPollingDisabled()) {
846         slave.add(NEXT_EXECUTION_AT, nextScheduled.toString());
847       } else if (isPollingDisabled()) {
848         slave.add(NEXT_EXECUTION_AT, "Polling disabled");
849       }
850       addVal(slave, IndexFetcher.INDEX_REPLICATED_AT, props, Date.class);
851       addVal(slave, IndexFetcher.INDEX_REPLICATED_AT_LIST, props, List.class);
852       addVal(slave, IndexFetcher.REPLICATION_FAILED_AT_LIST, props, List.class);
853       addVal(slave, IndexFetcher.TIMES_INDEX_REPLICATED, props, Integer.class);
854       addVal(slave, IndexFetcher.CONF_FILES_REPLICATED, props, Integer.class);
855       addVal(slave, IndexFetcher.TIMES_CONFIG_REPLICATED, props, Integer.class);
856       addVal(slave, IndexFetcher.CONF_FILES_REPLICATED_AT, props, Integer.class);
857       addVal(slave, IndexFetcher.LAST_CYCLE_BYTES_DOWNLOADED, props, Long.class);
858       addVal(slave, IndexFetcher.TIMES_FAILED, props, Integer.class);
859       addVal(slave, IndexFetcher.REPLICATION_FAILED_AT, props, Date.class);
860       addVal(slave, IndexFetcher.PREVIOUS_CYCLE_TIME_TAKEN, props, Long.class);
861 
862       slave.add("currentDate", new Date().toString());
863       slave.add("isPollingDisabled", String.valueOf(isPollingDisabled()));
864       boolean isReplicating = isReplicating();
865       slave.add("isReplicating", String.valueOf(isReplicating));
866       if (isReplicating) {
867         try {
868           long bytesToDownload = 0;
869           List<String> filesToDownload = new ArrayList<>();
870           for (Map<String, Object> file : fetcher.getFilesToDownload()) {
871             filesToDownload.add((String) file.get(NAME));
872             bytesToDownload += (Long) file.get(SIZE);
873           }
874 
875           //get list of conf files to download
876           for (Map<String, Object> file : fetcher.getConfFilesToDownload()) {
877             filesToDownload.add((String) file.get(NAME));
878             bytesToDownload += (Long) file.get(SIZE);
879           }
880 
881           slave.add("filesToDownload", filesToDownload);
882           slave.add("numFilesToDownload", String.valueOf(filesToDownload.size()));
883           slave.add("bytesToDownload", NumberUtils.readableSize(bytesToDownload));
884 
885           long bytesDownloaded = 0;
886           List<String> filesDownloaded = new ArrayList<>();
887           for (Map<String, Object> file : fetcher.getFilesDownloaded()) {
888             filesDownloaded.add((String) file.get(NAME));
889             bytesDownloaded += (Long) file.get(SIZE);
890           }
891 
892           //get list of conf files downloaded
893           for (Map<String, Object> file : fetcher.getConfFilesDownloaded()) {
894             filesDownloaded.add((String) file.get(NAME));
895             bytesDownloaded += (Long) file.get(SIZE);
896           }
897 
898           Map<String, Object> currentFile = fetcher.getCurrentFile();
899           String currFile = null;
900           long currFileSize = 0, currFileSizeDownloaded = 0;
901           float percentDownloaded = 0;
902           if (currentFile != null) {
903             currFile = (String) currentFile.get(NAME);
904             currFileSize = (Long) currentFile.get(SIZE);
905             if (currentFile.containsKey("bytesDownloaded")) {
906               currFileSizeDownloaded = (Long) currentFile.get("bytesDownloaded");
907               bytesDownloaded += currFileSizeDownloaded;
908               if (currFileSize > 0)
909                 percentDownloaded = (currFileSizeDownloaded * 100) / currFileSize;
910             }
911           }
912           slave.add("filesDownloaded", filesDownloaded);
913           slave.add("numFilesDownloaded", String.valueOf(filesDownloaded.size()));
914 
915           long estimatedTimeRemaining = 0;
916 
917           Date replicationStartTimeStamp = fetcher.getReplicationStartTimeStamp();
918           if (replicationStartTimeStamp != null) {
919             slave.add("replicationStartTime", replicationStartTimeStamp.toString());
920           }
921           long elapsed = fetcher.getReplicationTimeElapsed();
922           slave.add("timeElapsed", String.valueOf(elapsed) + "s");
923 
924           if (bytesDownloaded > 0)
925             estimatedTimeRemaining = ((bytesToDownload - bytesDownloaded) * elapsed) / bytesDownloaded;
926           float totalPercent = 0;
927           long downloadSpeed = 0;
928           if (bytesToDownload > 0)
929             totalPercent = (bytesDownloaded * 100) / bytesToDownload;
930           if (elapsed > 0)
931             downloadSpeed = (bytesDownloaded / elapsed);
932           if (currFile != null)
933             slave.add("currentFile", currFile);
934           slave.add("currentFileSize", NumberUtils.readableSize(currFileSize));
935           slave.add("currentFileSizeDownloaded", NumberUtils.readableSize(currFileSizeDownloaded));
936           slave.add("currentFileSizePercent", String.valueOf(percentDownloaded));
937           slave.add("bytesDownloaded", NumberUtils.readableSize(bytesDownloaded));
938           slave.add("totalPercent", String.valueOf(totalPercent));
939           slave.add("timeRemaining", String.valueOf(estimatedTimeRemaining) + "s");
940           slave.add("downloadSpeed", NumberUtils.readableSize(downloadSpeed));
941         } catch (Exception e) {
942           LOG.error("Exception while writing replication details: ", e);
943         }
944       }
945     }
946 
947     if (isMaster)
948       details.add("master", master);
949     if (slave.size() > 0)
950       details.add("slave", slave);
951 
952     NamedList snapshotStats = snapShootDetails;
953     if (snapshotStats != null)
954       details.add(CMD_BACKUP, snapshotStats);
955 
956     return details;
957   }
958 
959   private void addVal(NamedList<Object> nl, String key, Properties props, Class clzz) {
960     String s = props.getProperty(key);
961     if (s == null || s.trim().length() == 0) return;
962     if (clzz == Date.class) {
963       try {
964         Long l = Long.parseLong(s);
965         nl.add(key, new Date(l).toString());
966       } catch (NumberFormatException e) {/*no op*/ }
967     } else if (clzz == List.class) {
968       String ss[] = s.split(",");
969       List<String> l = new ArrayList<>();
970       for (String s1 : ss) {
971         l.add(new Date(Long.valueOf(s1)).toString());
972       }
973       nl.add(key, l);
974     } else {
975       nl.add(key, s);
976     }
977 
978   }
979 
980   private List<String> getReplicateAfterStrings() {
981     List<String> replicateAfter = new ArrayList<>();
982     if (replicateOnCommit)
983       replicateAfter.add("commit");
984     if (replicateOnOptimize)
985       replicateAfter.add("optimize");
986     if (replicateOnStart)
987       replicateAfter.add("startup");
988     return replicateAfter;
989   }
990 
991   Properties loadReplicationProperties() {
992     Directory dir = null;
993     try {
994       try {
995         dir = core.getDirectoryFactory().get(core.getDataDir(),
996             DirContext.META_DATA, core.getSolrConfig().indexConfig.lockType);
997         IndexInput input;
998         try {
999           input = dir.openInput(
1000             IndexFetcher.REPLICATION_PROPERTIES, IOContext.DEFAULT);
1001         } catch (FileNotFoundException | NoSuchFileException e) {
1002           return new Properties();
1003         }
1004 
1005         try {
1006           final InputStream is = new PropertiesInputStream(input);
1007           Properties props = new Properties();
1008           props.load(new InputStreamReader(is, StandardCharsets.UTF_8));
1009           return props;
1010         } finally {
1011           input.close();
1012         }
1013       } finally {
1014         if (dir != null) {
1015           core.getDirectoryFactory().release(dir);
1016         }
1017       }
1018     } catch (IOException e) {
1019       throw new SolrException(ErrorCode.SERVER_ERROR, e);
1020     }
1021   }
1022 
1023 
1024 //  void refreshCommitpoint() {
1025 //    IndexCommit commitPoint = core.getDeletionPolicy().getLatestCommit();
1026 //    if(replicateOnCommit || (replicateOnOptimize && commitPoint.getSegmentCount() == 1)) {
1027 //      indexCommitPoint = commitPoint;
1028 //    }
1029 //  }
1030 
1031   private void setupPolling(String intervalStr) {
1032     pollIntervalStr = intervalStr;
1033     pollIntervalNs = readIntervalNs(pollIntervalStr);
1034     if (pollIntervalNs == null || pollIntervalNs <= 0) {
1035       LOG.info(" No value set for 'pollInterval'. Timer Task not started.");
1036       return;
1037     }
1038 
1039     Runnable task = new Runnable() {
1040       @Override
1041       public void run() {
1042         if (pollDisabled.get()) {
1043           LOG.info("Poll disabled");
1044           return;
1045         }
1046         try {
1047           LOG.debug("Polling for index modifications");
1048           markScheduledExecutionStart();
1049           doFetch(null, false);
1050         } catch (Exception e) {
1051           LOG.error("Exception in fetching index", e);
1052         }
1053       }
1054     };
1055     executorService = Executors.newSingleThreadScheduledExecutor(
1056         new DefaultSolrThreadFactory("indexFetcher"));
1057     // Randomize initial delay, with a minimum of 1ms
1058     long initialDelayNs = new Random().nextLong() % pollIntervalNs
1059         + TimeUnit.NANOSECONDS.convert(1, TimeUnit.MILLISECONDS);
1060     executorService.scheduleAtFixedRate(task, initialDelayNs, pollIntervalNs, TimeUnit.NANOSECONDS);
1061     LOG.info("Poll scheduled at an interval of {}ms",
1062         TimeUnit.MILLISECONDS.convert(pollIntervalNs, TimeUnit.NANOSECONDS));
1063   }
1064 
1065   @Override
1066   @SuppressWarnings("unchecked")
1067   public void inform(SolrCore core) {
1068     this.core = core;
1069     registerCloseHook();
1070     Object nbtk = initArgs.get(NUMBER_BACKUPS_TO_KEEP_INIT_PARAM);
1071     if(nbtk!=null) {
1072       numberBackupsToKeep = Integer.parseInt(nbtk.toString());
1073     } else {
1074       numberBackupsToKeep = 0;
1075     }
1076     NamedList slave = (NamedList) initArgs.get("slave");
1077     boolean enableSlave = isEnabled( slave );
1078     if (enableSlave) {
1079       currentIndexFetcher = pollingIndexFetcher = new IndexFetcher(slave, this, core);
1080       setupPolling((String) slave.get(POLL_INTERVAL));
1081       isSlave = true;
1082     }
1083     NamedList master = (NamedList) initArgs.get("master");
1084     boolean enableMaster = isEnabled( master );
1085 
1086     if (enableMaster || enableSlave) {
1087       if (core.getCoreDescriptor().getCoreContainer().getZkController() != null) {
1088         LOG.warn("SolrCloud is enabled for core " + core.getName() + " but so is old-style replication. Make sure you" +
1089             " intend this behavior, it usually indicates a mis-configuration. Master setting is " +
1090             Boolean.toString(enableMaster) + " and slave setting is " + Boolean.toString(enableSlave));
1091       }
1092     }
1093 
1094     if (!enableSlave && !enableMaster) {
1095       enableMaster = true;
1096       master = new NamedList<>();
1097     }
1098 
1099     if (enableMaster) {
1100       includeConfFiles = (String) master.get(CONF_FILES);
1101       if (includeConfFiles != null && includeConfFiles.trim().length() > 0) {
1102         List<String> files = Arrays.asList(includeConfFiles.split(","));
1103         for (String file : files) {
1104           if (file.trim().length() == 0) continue;
1105           String[] strs = file.trim().split(":");
1106           // if there is an alias add it or it is null
1107           confFileNameAlias.add(strs[0], strs.length > 1 ? strs[1] : null);
1108         }
1109         LOG.info("Replication enabled for following config files: " + includeConfFiles);
1110       }
1111       List backup = master.getAll("backupAfter");
1112       boolean backupOnCommit = backup.contains("commit");
1113       boolean backupOnOptimize = !backupOnCommit && backup.contains("optimize");
1114       List replicateAfter = master.getAll(REPLICATE_AFTER);
1115       replicateOnCommit = replicateAfter.contains("commit");
1116       replicateOnOptimize = !replicateOnCommit && replicateAfter.contains("optimize");
1117 
1118       if (!replicateOnCommit && ! replicateOnOptimize) {
1119         replicateOnCommit = true;
1120       }
1121 
1122       // if we only want to replicate on optimize, we need the deletion policy to
1123       // save the last optimized commit point.
1124       if (replicateOnOptimize) {
1125         IndexDeletionPolicyWrapper wrapper = core.getDeletionPolicy();
1126         IndexDeletionPolicy policy = wrapper == null ? null : wrapper.getWrappedDeletionPolicy();
1127         if (policy instanceof SolrDeletionPolicy) {
1128           SolrDeletionPolicy solrPolicy = (SolrDeletionPolicy)policy;
1129           if (solrPolicy.getMaxOptimizedCommitsToKeep() < 1) {
1130             solrPolicy.setMaxOptimizedCommitsToKeep(1);
1131           }
1132         } else {
1133           LOG.warn("Replication can't call setMaxOptimizedCommitsToKeep on " + policy);
1134         }
1135       }
1136 
1137       if (replicateOnOptimize || backupOnOptimize) {
1138         core.getUpdateHandler().registerOptimizeCallback(getEventListener(backupOnOptimize, replicateOnOptimize));
1139       }
1140       if (replicateOnCommit || backupOnCommit) {
1141         replicateOnCommit = true;
1142         core.getUpdateHandler().registerCommitCallback(getEventListener(backupOnCommit, replicateOnCommit));
1143       }
1144       if (replicateAfter.contains("startup")) {
1145         replicateOnStart = true;
1146         RefCounted<SolrIndexSearcher> s = core.getNewestSearcher(false);
1147         try {
1148           DirectoryReader reader = s==null ? null : s.get().getIndexReader();
1149           if (reader!=null && reader.getIndexCommit() != null && reader.getIndexCommit().getGeneration() != 1L) {
1150             try {
1151               if(replicateOnOptimize){
1152                 Collection<IndexCommit> commits = DirectoryReader.listCommits(reader.directory());
1153                 for (IndexCommit ic : commits) {
1154                   if(ic.getSegmentCount() == 1){
1155                     if(indexCommitPoint == null || indexCommitPoint.getGeneration() < ic.getGeneration()) indexCommitPoint = ic;
1156                   }
1157                 }
1158               } else{
1159                 indexCommitPoint = reader.getIndexCommit();
1160               }
1161             } finally {
1162               // We don't need to save commit points for replication, the SolrDeletionPolicy
1163               // always saves the last commit point (and the last optimized commit point, if needed)
1164               /***
1165               if(indexCommitPoint != null){
1166                 core.getDeletionPolicy().saveCommitPoint(indexCommitPoint.getGeneration());
1167               }
1168               ***/
1169             }
1170           }
1171 
1172           // ensure the writer is init'd so that we have a list of commit points
1173           RefCounted<IndexWriter> iw = core.getUpdateHandler().getSolrCoreState().getIndexWriter(core);
1174           iw.decref();
1175 
1176         } catch (IOException e) {
1177           LOG.warn("Unable to get IndexCommit on startup", e);
1178         } finally {
1179           if (s!=null) s.decref();
1180         }
1181       }
1182       String reserve = (String) master.get(RESERVE);
1183       if (reserve != null && !reserve.trim().equals("")) {
1184         reserveCommitDuration = readIntervalMs(reserve);
1185       }
1186       LOG.info("Commits will be reserved for  " + reserveCommitDuration);
1187       isMaster = true;
1188     }
1189   }
1190 
1191   // check master or slave is enabled
1192   private boolean isEnabled( NamedList params ){
1193     if( params == null ) return false;
1194     Object enable = params.get( "enable" );
1195     if( enable == null ) return true;
1196     if( enable instanceof String )
1197       return StrUtils.parseBool( (String)enable );
1198     return Boolean.TRUE.equals( enable );
1199   }
1200 
1201   /**
1202    * register a closehook
1203    */
1204   private void registerCloseHook() {
1205     core.addCloseHook(new CloseHook() {
1206       @Override
1207       public void preClose(SolrCore core) {
1208         try {
1209           if (executorService != null) executorService.shutdown(); // we don't wait for shutdown - this can deadlock core reload
1210         } finally {
1211             if (pollingIndexFetcher != null) {
1212               pollingIndexFetcher.destroy();
1213             }
1214         }
1215         if (currentIndexFetcher != null && currentIndexFetcher != pollingIndexFetcher) {
1216           currentIndexFetcher.destroy();
1217         }
1218       }
1219 
1220       @Override
1221       public void postClose(SolrCore core) {}
1222     });
1223 
1224     core.addCloseHook(new CloseHook() {
1225       @Override
1226       public void preClose(SolrCore core) {
1227         ExecutorUtil.shutdownAndAwaitTermination(restoreExecutor);
1228         if (restoreFuture != null) {
1229           restoreFuture.cancel(false);
1230         }
1231       }
1232 
1233       @Override
1234       public void postClose(SolrCore core) {}
1235     });
1236   }
1237 
1238   /**
1239    * Register a listener for postcommit/optimize
1240    *
1241    * @param snapshoot do a snapshoot
1242    * @param getCommit get a commitpoint also
1243    *
1244    * @return an instance of the eventlistener
1245    */
1246   private SolrEventListener getEventListener(final boolean snapshoot, final boolean getCommit) {
1247     return new SolrEventListener() {
1248       @Override
1249       public void init(NamedList args) {/*no op*/ }
1250 
1251       /**
1252        * This refreshes the latest replicateable index commit and optionally can create Snapshots as well
1253        */
1254       @Override
1255       public void postCommit() {
1256         IndexCommit currentCommitPoint = core.getDeletionPolicy().getLatestCommit();
1257 
1258         if (getCommit) {
1259           // IndexCommit oldCommitPoint = indexCommitPoint;
1260           indexCommitPoint = currentCommitPoint;
1261 
1262           // We don't need to save commit points for replication, the SolrDeletionPolicy
1263           // always saves the last commit point (and the last optimized commit point, if needed)
1264           /***
1265           if (indexCommitPoint != null) {
1266             core.getDeletionPolicy().saveCommitPoint(indexCommitPoint.getGeneration());
1267           }
1268           if(oldCommitPoint != null){
1269             core.getDeletionPolicy().releaseCommitPointAndExtendReserve(oldCommitPoint.getGeneration());
1270           }
1271           ***/
1272         }
1273         if (snapshoot) {
1274           try {            
1275             int numberToKeep = numberBackupsToKeep;
1276             if (numberToKeep < 1) {
1277               numberToKeep = Integer.MAX_VALUE;
1278             }            
1279             SnapShooter snapShooter = new SnapShooter(core, null, null);
1280             snapShooter.validateCreateSnapshot();
1281             snapShooter.createSnapAsync(currentCommitPoint, numberToKeep, ReplicationHandler.this);
1282           } catch (Exception e) {
1283             LOG.error("Exception while snapshooting", e);
1284           }
1285         }
1286       }
1287 
1288       @Override
1289       public void newSearcher(SolrIndexSearcher newSearcher, SolrIndexSearcher currentSearcher) { /*no op*/}
1290 
1291       @Override
1292       public void postSoftCommit() {
1293 
1294       }
1295     };
1296   }
1297 
1298   /**This class is used to read and send files in the lucene index
1299    *
1300    */
1301   private class DirectoryFileStream implements SolrCore.RawWriter {
1302     protected SolrParams params;
1303 
1304     protected FastOutputStream fos;
1305 
1306     protected Long indexGen;
1307     protected IndexDeletionPolicyWrapper delPolicy;
1308 
1309     protected String fileName;
1310     protected String cfileName;
1311     protected String sOffset;
1312     protected String sLen;
1313     protected String compress;
1314     protected boolean useChecksum;
1315 
1316     protected long offset = -1;
1317     protected int len = -1;
1318 
1319     protected Checksum checksum;
1320 
1321     private RateLimiter rateLimiter;
1322 
1323     byte[] buf;
1324 
1325     public DirectoryFileStream(SolrParams solrParams) {
1326       params = solrParams;
1327       delPolicy = core.getDeletionPolicy();
1328 
1329       fileName = params.get(FILE);
1330       cfileName = params.get(CONF_FILE_SHORT);
1331       sOffset = params.get(OFFSET);
1332       sLen = params.get(LEN);
1333       compress = params.get(COMPRESSION);
1334       useChecksum = params.getBool(CHECKSUM, false);
1335       indexGen = params.getLong(GENERATION, null);
1336       if (useChecksum) {
1337         checksum = new Adler32();
1338       }
1339       //No throttle if MAX_WRITE_PER_SECOND is not specified
1340       double maxWriteMBPerSec = params.getDouble(MAX_WRITE_PER_SECOND, Double.MAX_VALUE);
1341       rateLimiter = new RateLimiter.SimpleRateLimiter(maxWriteMBPerSec);
1342     }
1343 
1344     protected void initWrite() throws IOException {
1345       if (sOffset != null) offset = Long.parseLong(sOffset);
1346       if (sLen != null) len = Integer.parseInt(sLen);
1347       if (fileName == null && cfileName == null) {
1348         // no filename do nothing
1349         writeNothingAndFlush();
1350       }
1351       buf = new byte[(len == -1 || len > PACKET_SZ) ? PACKET_SZ : len];
1352 
1353       //reserve commit point till write is complete
1354       if(indexGen != null) {
1355         delPolicy.saveCommitPoint(indexGen);
1356       }
1357     }
1358 
1359     protected void createOutputStream(OutputStream out) {
1360       if (Boolean.parseBoolean(compress)) {
1361         fos = new FastOutputStream(new DeflaterOutputStream(out));
1362       } else {
1363         fos = new FastOutputStream(out);
1364       }
1365     }
1366 
1367     protected void extendReserveAndReleaseCommitPoint() {
1368       if(indexGen != null) {
1369         //Reserve the commit point for another 10s for the next file to be to fetched.
1370         //We need to keep extending the commit reservation between requests so that the replica can fetch
1371         //all the files correctly.
1372         delPolicy.setReserveDuration(indexGen, reserveCommitDuration);
1373 
1374         //release the commit point as the write is complete
1375         delPolicy.releaseCommitPoint(indexGen);
1376       }
1377 
1378     }
1379     public void write(OutputStream out) throws IOException {
1380       createOutputStream(out);
1381 
1382       IndexInput in = null;
1383       try {
1384         initWrite();
1385 
1386         RefCounted<SolrIndexSearcher> sref = core.getSearcher();
1387         Directory dir;
1388         try {
1389           SolrIndexSearcher searcher = sref.get();
1390           dir = searcher.getIndexReader().directory();
1391         } finally {
1392           sref.decref();
1393         }
1394         in = dir.openInput(fileName, IOContext.READONCE);
1395         // if offset is mentioned move the pointer to that point
1396         if (offset != -1) in.seek(offset);
1397         
1398         long filelen = dir.fileLength(fileName);
1399         long maxBytesBeforePause = 0;
1400 
1401         while (true) {
1402           offset = offset == -1 ? 0 : offset;
1403           int read = (int) Math.min(buf.length, filelen - offset);
1404           in.readBytes(buf, 0, read);
1405 
1406           fos.writeInt(read);
1407           if (useChecksum) {
1408             checksum.reset();
1409             checksum.update(buf, 0, read);
1410             fos.writeLong(checksum.getValue());
1411           }
1412           fos.write(buf, 0, read);
1413           fos.flush();
1414           LOG.debug("Wrote {} bytes for file {}", offset + read, fileName);
1415 
1416           //Pause if necessary
1417           maxBytesBeforePause += read;
1418           if (maxBytesBeforePause >= rateLimiter.getMinPauseCheckBytes()) {
1419             rateLimiter.pause(maxBytesBeforePause);
1420             maxBytesBeforePause = 0;
1421           }
1422           if (read != buf.length) {
1423             writeNothingAndFlush();
1424             fos.close();
1425             break;
1426           }
1427           offset += read;
1428           in.seek(offset);
1429         }
1430       } catch (IOException e) {
1431         LOG.warn("Exception while writing response for params: " + params, e);
1432       } finally {
1433         if (in != null) {
1434           in.close();
1435         }
1436         extendReserveAndReleaseCommitPoint();
1437       }
1438     }
1439 
1440 
1441     /**
1442      * Used to write a marker for EOF
1443      */
1444     protected void writeNothingAndFlush() throws IOException {
1445       fos.writeInt(0);
1446       fos.flush();
1447     }
1448   }
1449 
1450   /**This is used to write files in the conf directory.
1451    */
1452   private class LocalFsFileStream extends DirectoryFileStream {
1453 
1454     public LocalFsFileStream(SolrParams solrParams) {
1455       super(solrParams);
1456     }
1457 
1458     @Override
1459     public void write(OutputStream out) throws IOException {
1460       createOutputStream(out);
1461       FileInputStream inputStream = null;
1462       try {
1463         initWrite();
1464 
1465         //if if is a conf file read from config directory
1466         File file = new File(core.getResourceLoader().getConfigDir(), cfileName);
1467 
1468         if (file.exists() && file.canRead()) {
1469           inputStream = new FileInputStream(file);
1470           FileChannel channel = inputStream.getChannel();
1471           //if offset is mentioned move the pointer to that point
1472           if (offset != -1)
1473             channel.position(offset);
1474           ByteBuffer bb = ByteBuffer.wrap(buf);
1475 
1476           while (true) {
1477             bb.clear();
1478             long bytesRead = channel.read(bb);
1479             if (bytesRead <= 0) {
1480               writeNothingAndFlush();
1481               fos.close();
1482               break;
1483             }
1484             fos.writeInt((int) bytesRead);
1485             if (useChecksum) {
1486               checksum.reset();
1487               checksum.update(buf, 0, (int) bytesRead);
1488               fos.writeLong(checksum.getValue());
1489             }
1490             fos.write(buf, 0, (int) bytesRead);
1491             fos.flush();
1492           }
1493         } else {
1494           writeNothingAndFlush();
1495         }
1496       } catch (IOException e) {
1497         LOG.warn("Exception while writing response for params: " + params, e);
1498       } finally {
1499         IOUtils.closeQuietly(inputStream);
1500         extendReserveAndReleaseCommitPoint();
1501       }
1502     }
1503   }
1504 
1505   private static Integer readIntervalMs(String interval) {
1506     return (int) TimeUnit.MILLISECONDS.convert(readIntervalNs(interval), TimeUnit.NANOSECONDS);
1507   }
1508 
1509   private static Long readIntervalNs(String interval) {
1510     if (interval == null)
1511       return null;
1512     int result = 0;
1513     Matcher m = INTERVAL_PATTERN.matcher(interval.trim());
1514     if (m.find()) {
1515       String hr = m.group(1);
1516       String min = m.group(2);
1517       String sec = m.group(3);
1518       result = 0;
1519       try {
1520         if (sec != null && sec.length() > 0)
1521           result += Integer.parseInt(sec);
1522         if (min != null && min.length() > 0)
1523           result += (60 * Integer.parseInt(min));
1524         if (hr != null && hr.length() > 0)
1525           result += (60 * 60 * Integer.parseInt(hr));
1526         return TimeUnit.NANOSECONDS.convert(result, TimeUnit.SECONDS);
1527       } catch (NumberFormatException e) {
1528         throw new SolrException(ErrorCode.SERVER_ERROR, INTERVAL_ERR_MSG);
1529       }
1530     } else {
1531       throw new SolrException(ErrorCode.SERVER_ERROR, INTERVAL_ERR_MSG);
1532     }
1533   }
1534 
1535   private static final String LOCATION = "location";
1536 
1537   private static final String SUCCESS = "success";
1538 
1539   private static final String FAILED = "failed";
1540 
1541   private static final String EXCEPTION = "exception";
1542 
1543   public static final String MASTER_URL = "masterUrl";
1544 
1545   public static final String STATUS = "status";
1546 
1547   public static final String COMMAND = "command";
1548 
1549   public static final String CMD_DETAILS = "details";
1550 
1551   public static final String CMD_BACKUP = "backup";
1552 
1553   public static final String CMD_RESTORE = "restore";
1554 
1555   public static final String CMD_RESTORE_STATUS = "restorestatus";
1556 
1557   public static final String CMD_FETCH_INDEX = "fetchindex";
1558 
1559   public static final String CMD_ABORT_FETCH = "abortfetch";
1560 
1561   public static final String CMD_GET_FILE_LIST = "filelist";
1562 
1563   public static final String CMD_GET_FILE = "filecontent";
1564 
1565   public static final String CMD_DISABLE_POLL = "disablepoll";
1566 
1567   public static final String CMD_DISABLE_REPL = "disablereplication";
1568 
1569   public static final String CMD_ENABLE_REPL = "enablereplication";
1570 
1571   public static final String CMD_ENABLE_POLL = "enablepoll";
1572 
1573   public static final String CMD_INDEX_VERSION = "indexversion";
1574 
1575   public static final String CMD_SHOW_COMMITS = "commits";
1576 
1577   public static final String CMD_DELETE_BACKUP = "deletebackup";
1578 
1579   public static final String GENERATION = "generation";
1580 
1581   public static final String OFFSET = "offset";
1582 
1583   public static final String LEN = "len";
1584 
1585   public static final String FILE = "file";
1586 
1587   public static final String SIZE = "size";
1588 
1589   public static final String MAX_WRITE_PER_SECOND = "maxWriteMBPerSec";
1590 
1591   public static final String CONF_FILE_SHORT = "cf";
1592 
1593   public static final String CHECKSUM = "checksum";
1594 
1595   public static final String ALIAS = "alias";
1596 
1597   public static final String CONF_CHECKSUM = "confchecksum";
1598 
1599   public static final String CONF_FILES = "confFiles";
1600 
1601   public static final String REPLICATE_AFTER = "replicateAfter";
1602 
1603   public static final String FILE_STREAM = "filestream";
1604 
1605   public static final String POLL_INTERVAL = "pollInterval";
1606 
1607   public static final String INTERVAL_ERR_MSG = "The " + POLL_INTERVAL + " must be in this format 'HH:mm:ss'";
1608 
1609   private static final Pattern INTERVAL_PATTERN = Pattern.compile("(\\d*?):(\\d*?):(\\d*)");
1610 
1611   public static final int PACKET_SZ = 1024 * 1024; // 1MB
1612 
1613   public static final String RESERVE = "commitReserveDuration";
1614 
1615   public static final String COMPRESSION = "compression";
1616 
1617   public static final String EXTERNAL = "external";
1618 
1619   public static final String INTERNAL = "internal";
1620 
1621   public static final String ERR_STATUS = "ERROR";
1622 
1623   public static final String OK_STATUS = "OK";
1624 
1625   public static final String NEXT_EXECUTION_AT = "nextExecutionAt";
1626   
1627   public static final String NUMBER_BACKUPS_TO_KEEP_REQUEST_PARAM = "numberToKeep";
1628   
1629   public static final String NUMBER_BACKUPS_TO_KEEP_INIT_PARAM = "maxNumberOfBackups";
1630 
1631   /** 
1632    * Boolean param for tests that can be specified when using 
1633    * {@link #CMD_FETCH_INDEX} to force the current request to block until 
1634    * the fetch is complete.  <b>NOTE:</b> This param is not advised for 
1635    * non-test code, since the the duration of the fetch for non-trivial
1636    * indexes will likeley cause the request to time out.
1637    *
1638    * @lucene.internal
1639    */
1640   public static final String WAIT = "wait";
1641 }